DataAPIでRedshiftにJSONデータを格納して、Tableauで可視化してみた
データアナリティクス事業本部の鈴木です。
先日、RedshiftにJSONを格納し、Tableauで可視化する記事を公開しました。
今度はLambdaからData API for Redshift(以降「DataAPI」)を使ってJSONを含んだレコードを同じテーブルに追加し、Tableauで可視化してみました。
前回記事はこちらになります。
今回の記事では、SUPER型のカラムが存在するテーブルがあったとして、DataAPIからレコードを追加できるのかに関心がありました。また、変更後のデータをTableauで可視化するために必要な項目も併せて確認しました。
環境準備
まず、前回記事で作成したRedshiftのリソースについて振り返ります。
テーブル
irisというテーブルを作成しました。sampleidというカラムにデータのIDを、iris_jsonというSUPER型のカラムにJSONを格納しました。iris_jsonは、具体的にはscikit-learnのIris Dataset(いわゆるアイリスデータ)をJSONに変換したものでした。
CREATE TABLE iris( sampleid INT NOT NULL, iris_json SUPER );
マテリアライズドビュー
Tableauからの参照用に、iris_materialized_viewというマテリアライズドビューを作成していました。TableauがJSONデータを読み出せるように、マテリアライズドビューの定義内でドット記法を使ってJSONの属性を展開しました。
CREATE MATERIALIZED VIEW iris_materialized_view AS SELECT sampleid, iris_json.sepal_length, iris_json.sepal_width, iris_json.petal_length, iris_json.petal_width, iris_json.species FROM iris;
続いて、今回の検証の構成を説明します。
検証した構成
今回は、RedshiftがあるAWSアカウントとは違うAWSアカウントのLambdaから、DataAPIを使ってデータを追加しました。構成は以下のようなイメージです。
Lambdaは以下の2つを作成しました。
- データを挿入するLambda
- マテリアライズドビューをリフレッシュするLambda
実際には、2つのLambdaをAWS Step Functionsなどで実行するような想定です。
また、DataAPIにアクセスするためのIAMロールを準備しておきます。今回はクロスアカウントで行うので、以下の2つのロールを用意します。
- LambdaにDataAPIへのアクセスを許可する、Redshiftアカウント側のIAMロール。
- 上記IAMロールへのAssumeRoleを許可する、Lambdaアカウント側のIAMロール。
具体的な設定手順は、弊社平野の記事をご参照ください。
DataAPIで楽々、Lambda関数から別アカウントのRedshiftにクエリを投げてみた。 | DevelopersIO
やってみる
データの挿入
まず、1つ目のLambda関数を作成します。以下のコードをデプロイします。
コードは上記記事で紹介されたコードを参考にしています。今回はJSONを含んだレコードを新しく挿入したいので、SQL文を対応したものに変更しておきます。
# https://dev.classmethod.jp/articles/redshift-datapia-from-lambda-crossaccount/を参考にしました。 import json import time import boto3 from boto3.session import Session # Redshift接続情報 CLUSTER_NAME='cluster_name' DATABASE_NAME='sample_db' DB_USER='dbuser' ROLE_ARN='arn:aws:iam::XXXXXXXXXXXX:role/AllowRedshiftDataApiCrossAccount' # 実行するSQLを設定 sql = ''' INSERT INTO iris VALUES (151, JSON_PARSE('{"sepal_length":10.1,"sepal_width":7.5,"petal_length":2.4,"petal_width":1.2,"species":"dummy"}')); ''' def create_session(): sts_client = boto3.client('sts', endpoint_url='https://sts.ap-northeast-1.amazonaws.com') credentials = sts_client.assume_role(RoleArn=ROLE_ARN, RoleSessionName='RedsfhitDataAPI') accesskey = credentials['Credentials']['AccessKeyId'] secretkey = credentials['Credentials']['SecretAccessKey'] session_token = credentials['Credentials']['SessionToken'] session = Session(aws_access_key_id=accesskey, aws_secret_access_key=secretkey, aws_session_token=session_token) return session def lambda_handler(event, context): # Redshiftにクエリを投げる。非同期なのですぐ返ってくる session = create_session() data_client = session.client('redshift-data') result = data_client.execute_statement( ClusterIdentifier=CLUSTER_NAME, Database=DATABASE_NAME, DbUser=DB_USER, Sql=sql, ) # 実行IDを取得 id = result['Id'] # クエリが終わるのを待つ statement = '' status = '' while status != 'FINISHED' and status != 'FAILED' and status != 'ABORTED': statement = data_client.describe_statement(Id=id) status = statement['Status'] time.sleep(1) # 結果の表示 if status == 'FINISHED': if int(statement['ResultSize']) > 0: # select文等なら戻り値を表示 statement = data_client.get_statement_result(Id=id) print(json.dumps(statement['Records'])) else: # 戻り値がないものはFINISHだけ出力して終わり print('QUERY FINSHED') elif status == 'FAILED': # 失敗時 print('QUERY FAILED\n{}'.format(statement)) elif status == 'ABORTED': # ユーザによる停止時 print('QUERY ABORTED: The query run was stopped by the user.')
SQL文で挿入したデータは、結果が分かりやすいよう、以下のように"species"キーのバリューが"dummy"であるJSONを用意しました。
{ "sepal_length":10.1, "sepal_width":7.5, "petal_length":2.4, "petal_width":1.2, "species":"dummy" }
半構造化データのRedshiftへのロードには「INSERTまたはUPDATEを使用する方法」と「COPYを使用する方法」がありますが、今回は簡単のため、前者で行いました。
Lambdaを実行し、Database Managerからテーブルを参照すると、IDが151のデータが追加されていることが確認できました。
マテリアライズドビューのリフレッシュ
続いて2つ目のLambdaを作成します。この関数はマテリアライズドビューのリフレッシュを行います。マテリアライズドビューは、ソーステーブルの変更を反映させるために、リフレッシュする必要があります。
1つ目のLambdaにデプロイしたコードと同じものを用意し、SQL文の内容を以下のように修正します。修正したコードを2つ目のLambdaにデプロイします。
(略) # 実行するSQLを設定 sql = ''' REFRESH MATERIALIZED VIEW iris_materialized_view; ''' (略)
Lambdaを実行すると、マテリアライズドビューにも新しく追加したレコードが反映されました。
Tableauでの可視化
最後に、新しく追加したレコードを含めて、Tableauで可視化してみます。
Tableau Desktopを起動し、Redshiftのiris_materialized_viewマテリアライズドビューに接続します。
iris_materialized_viewからデータを読み取って、散布図を描いてみます。
151番目のデータとして、○のマーカーが表示されており、Lambdaから投入したデータが可視化できていることが分かりました。
まとめ
DataAPIから、SUPER型のカラムにJSONを格納することができました。また、マテリアライズドビューをリフレッシュすることで、Tableauから追加したデータを参照できました。
今回はかなり簡易的な構成で行っていて、実際には、
- どのくらいまとめてLambdaからデータを投入するか
- どのくらいの頻度でマテリアライズドビューをリフレッシュするか
など、まだまだ検討することがたくさんありそうでした。
以上、同じような検討をされている方の参考になれば幸いです。